导读
// 抢取订单函数
public synchronized voidgrabOrder(Long orderId, Long userId) {
// 获取订单信息
OrderDO order = orderDAO.get(orderId);
if (Objects.isNull(order)) {
thrownew BizRuntimeException(String.format("订单(%s)不存在", orderId));
}
// 检查订单状态
if (!Objects.equals(order.getStatus, OrderStatus.WAITING_TO_GRAB.getValue())) {
thrownew BizRuntimeException(String.format("订单(%s)已被抢", orderId));
}
// 设置订单被抢
orderDAO.setGrabed(orderId, userId);
}
// 抢取订单函数
publicvoidgrabOrder(Long orderId, Long userId) {
Long lockId = orderDistributedLock.lock(orderId);
try {
grabOrderWithoutLock(orderId, userId);
} finally {
orderDistributedLock.unlock(orderId, lockId);
}
}
// 不带锁的抢取订单函数
privatevoidgrabOrderWithoutLock(Long orderId, Long userId) {
// 获取订单信息
OrderDO order = orderDAO.get(orderId);
if (Objects.isNull(order)) {
thrownew BizRuntimeException(String.format("订单(%s)不存在", orderId));
}
// 检查订单状态
if (!Objects.equals(order.getStatus, OrderStatus.WAITING_TO_GRAB.getValue())) {
thrownew BizRuntimeException(String.format("订单(%s)已被抢", orderId));
}
// 设置订单被抢
orderDAO.setGrabed(orderId, userId);
}
优化后的代码,在调用函数 grabOrderWithoutLock(不带锁的抢取订单)前后,利用分布式锁 orderDistributedLock(订单分布式锁)进行加锁和释放锁,跟单机版的 synchronized 关键字加锁效果基本一样。
分布式系统的优缺点
分布式系统(Distributed System)是支持分布式处理的软件系统,是由通信网络互联的多处理机体系结构上执行任务的系统,包括分布式操作系统、分布式程序设计语言及其编译系统、分布式文件系统分布式数据库系统等。
分布式系统的优点:
可靠性、高容错性:
一台服务器的崩溃,不会影响其它服务器,其它服务器仍能提供服务。
可扩展性:
如果系统服务能力不足,可以水平扩展更多服务器。
灵活性:
可以很容易的安装、实施、扩容和升级系统。
性能高:
拥有多台服务器的计算能力,比单台服务器处理速度更快。
性价比高:
分布式系统对服务器硬件要求很低,可以选用廉价服务器搭建分布式集群,从而得到更好的性价比。
分布式系统的缺点:
排查难度高:
由于系统分布在多台服务器上,故障排查和问题诊断难度较高。
软件支持少:
分布式系统解决方案的软件支持较少。
建设成本高:
需要多台服务器搭建分布式系统。
曾经有不少的朋友咨询我:"找外包做移动应用,需要注意哪些事项?"
首先,确定是否需要用分布式系统。软件预算有多少?预计用户量有多少?预计访问量有多少?是否只是业务前期试水版?单台服务器能否解决?是否接收短时间宕机?……如果综合考虑,单机版系统就可以解决的,那就不要采用分布式系统了。因为单机版系统和分布式系统的差别很大,相应的软件研发成本的差别也很大。
其次,确定是否真正的分布式系统。分布式系统最大的特点,就是当系统服务能力不足时,能够通过水平扩展的方式,通过增加服务器来增加服务能力。然而,单机版系统是不支持水平扩展的,强行扩展就会引起一系列数据问题。由于单机版系统和分布式系统的研发成本差别较大,市面上的外包团队大多用单机版系统代替分布式系统交付。
那么,如何确定你的系统是真正意义上的分布式系统呢?从软件上来说,是否采用了分布式软件解决方案;从硬件上来说,是否采用了分布式硬件部署方案。
分布式软件解决方案
作为一个合格的分布式系统,需要根据实际需求采用相应的分布式软件解决方案。
分布式锁
分布式锁是单机锁的一种扩展,主要是为了锁住分布式系统中的物理块或逻辑块,用以此保证不同服务之间的逻辑和数据的一致性。
目前,主流的分布式锁实现方式有3种:
基于数据库实现的分布式锁;
基于Redis实现的分布式锁;
基于Zookeeper实现的分布式锁。
分布式消息
分布式消息中间件是支持在分布式系统中发送和接受消息的软件基础设施。常见的分布式消息中间件有 ActiveMQ、RabbitMQ、Kafka、MetaQ 等。
MetaQ(全称Metamorphosis)是一个高性能、高可用、可扩展的分布式消息中间件,思路起源于 LinkedIn 的 Kafka ,但并不是 Kafka 的一个拷贝。MetaQ 具有消息存储顺序写、吞吐量大和支持本地和 XA 事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景。
数据库分片分组
针对大数据量的数据库,一般会采用"分片分组"策略:
分片(shard):主要解决扩展性问题,属于水平拆分。引入分片,就引入了数据路由和分区键的概念。其中,分表解决的是数据量过大的问题,分库解决的是数据库性能瓶颈的问题。
分组(group):主要解决可用性问题,通过主从复制的方式实现,并提供读写分离策略用以提高数据库性能。
分布式计算
分布式计算( Distributed computing )是一种"把需要进行大量计算的工程数据分割成小块,由多台计算机分别计算;在上传运算结果后,将结果统一合并得出数据结论"的科学。
当前的高性能服务器在处理海量数据时,其计算能力、内存容量等指标都远远无法达到要求。在大数据时代,工程师采用廉价的服务器组成分布式服务集群,以集群协作的方式完成海量数据的处理,从而解决单台服务器在计算与存储上的瓶颈。 Hadoop、Storm以 及 Spark 是常用的分布式计算中间件,Hadoop 是对非实时数据做批量处理的中间件,Storm 和 Spark 是对实时数据做流式处理的中间件。
除此之外,还有更多的分布式软件解决方案,这里就不再一一介绍了。
分布式硬件部署方案
介绍完服务端的分布式软件解决方案,就不得不介绍一下服务端的分布式硬件部署方案。这里,只画出了服务端常见的接口服务器、MySQL 数据库、Redis 缓存,而忽略了其它的云存储服务、消息队列服务、日志系统服务……
一般单机版部署方案
架构说明:
只有1台接口服务器、1 个 MySQL 数据库、1 个可选 Redis 缓存,可能都部署在同一台服务器上。
适用范围:
适用于演示环境、测试环境以及不怕宕机且日 PV 在 5 万以内的小型商业应用。
中小型分布式硬件部署方案
架构说明:
通过 SLB/Nginx 组成一个负载均衡的接口服务器集群,MySQL 数据库和 Redis 缓存采用了一主一备(或多备)的部署方式。
适用范围:
适用于日 PV 在 500 万以内的中小型商业应用。
大型分布式硬件部署方案
架构说明:
通过 SLB/Nginx 组成一个负载均衡的接口服务器集群,利用分片分组策略组成一个 MySQL 数据库集群和Redis缓存集群。
适用范围:
适用于日 PV 在 500 万以上的大型商业应用。
多线程使用不正确
多线程最主要目的就是"最大限度地利用 CPU 资源",可以把串行过程变成并行过程,从而提高了程序的执行效率。
一个慢接口案例
假设在用户登录时,如果是新用户,需要创建用户信息,并发放新用户优惠券。例子代码如下:
// 登录函数(示意写法)
public UserVO login(String phoneNumber, String verifyCode) {
// 检查验证码
if (!checkVerifyCode(phoneNumber, verifyCode)) {
thrownew ExampleException("验证码错误");
}
// 检查用户存在
UserDO user = userDAO.getByPhoneNumber(phoneNumber);
if (Objects.nonNull(user)) {
return transUser(user);
}
// 创建新用户
return createNewUser(user);
}
// 创建新用户函数
private UserVO createNewUser(String phoneNumber) {
// 创建新用户
UserDO user = new UserDO();
...
userDAO.insert(user);
// 绑定优惠券
couponService.bindCoupon(user.getId(), CouponType.NEW_USER);
// 返回新用户
return transUser(user);
}
// 创建新用户函数
private UserVO createNewUser(String phoneNumber) {
// 创建新用户
UserDO user = new UserDO();
...
userDAO.insert(user);
// 绑定优惠券
executorService.execute(()->couponService.bindCoupon(user.getId(), CouponType.NEW_USER));
// 返回新用户
return transUser(user);
}
// 创建新用户函数
private UserVO createNewUser(String phoneNumber) {
// 创建新用户
UserDO user = new UserDO();
...
userDAO.insert(user);
// 发送优惠券消息
Long userId = user.getId();
CouponMessageDataVO data = new CouponMessageDataVO();
data.setUserId(userId);
data.setCouponType(CouponType.NEW_USER);
Message message = new Message(TOPIC, TAG, userId, JSON.toJSONBytes(data));
SendResult result = metaqTemplate.sendMessage(message);
if (!Objects.equals(result, SendStatus.SEND_OK)) {
log.error("发送用户({})绑定优惠券消息失败:{}", userId, JSON.toJSONString(result));
}
// 返回新用户
return transUser(user);
}
// 优惠券服务类
@Slf4j
@Service
publicclassCouponServiceextendsDefaultMessageListener<String> {
// 消息处理函数
@Override
@Transactional(rollbackFor = Exception.class)
public void onReceiveMessages(MetaqMessage<String> message) {
// 获取消息体
String body = message.getBody();
if (StringUtils.isBlank(body)) {
log.warn("获取消息({})体为空", message.getId());
return;
}
// 解析消息数据
CouponMessageDataVO data = JSON.parseObject(body, CouponMessageDataVO.class);
if (Objects.isNull(data)) {
log.warn("解析消息({})体为空", message.getId());
return;
}
// 绑定优惠券
bindCoupon(data.getUserId(), data.getCouponType());
}
}
如果系统发生重启或崩溃,导致消息处理函数执行失败,不会确认消息已消费;由于 MetaQ 支持多服务订阅同一队列,该消息可以转到别的服务进行消费,亦或等到本服务恢复正常后再进行消费。
消费者可多服务、多线程进行消费消息,即便消息处理时间较长,也不容易引起消息积压;即便引起消息积压,也可以通过扩充服务实例的方式解决。
如果需要重新消费该消息,只需要在 MetaQ 管理平台上点击"消息验证"即可。
/** 完成采购动作函数(此处省去获取采购单/验证状态/锁定采购单等逻辑) */
publicvoidfinishPurchase(PurchaseOrder order) {
// 完成相关处理
......
// 回流采购单(调用HTTP接口)
backflowPurchaseOrder(order);
// 设置完成状态
purchaseOrderDAO.setStatus(order.getId(), PurchaseOrderStatus.FINISHED.getValue());
}
该函数可能耗费时间较长,导致完成采购接口成为慢接口;
该函数可能失败抛出异常,导致客户调用完成采购接口失败。
/** 完成采购动作函数(此处省去获取采购单/验证状态/锁定采购单等逻辑) */
publicvoidfinishPurchase(PurchaseOrder order) {
// 完成相关处理
......
// 设置完成状态
purchaseOrderDAO.setStatus(order.getId(), PurchaseOrderStatus.FINISHED.getValue());
}
/** 执行回流动作函数(此处省去获取采购单/验证状态/锁定采购单等逻辑) */
publicvoidexecuteBackflow(PurchaseOrder order) {
// 回流采购单(调用HTTP接口)
backflowPurchaseOrder(order);
// 设置回流状态
purchaseOrderDAO.setStatus(order.getId(), PurchaseOrderStatus.BACKFLOWED.getValue());
}
状态必须是一个持久状态,而不能是一个临时状态;
终结状态不能是中间状态,不能继续进行流程流转;
状态划分合理,不要把多个状态强制合并为一个状态;
状态尽量精简,同一状态的不同情况可以用其它字段表示。
每个动作执行前,必须检查当前状态和触发动作状态的一致性;
状态机的状态更改,只能通过动作进行,其它操作都是不符合规范的;
需要添加分布式锁保证动作的原子性,添加数据库事务保证数据的一致性;
类似的动作(比如操作用户、请求参数、动作含义等)可以合并为一个动作,并根据动作执行结果转向不同的状态。
/** 执行回流动作函数(此处省去获取采购单/验证状态/锁定采购单等逻辑) */
publicvoidexecuteBackflow(PurchaseOrder order) {
// 完成原始采购单
rawPurchaseOrderDAO.setStatus(order.getRawId(), RawPurchaseOrderStatus.FINISHED.getValue());
// 设置回流状态
purchaseOrderDAO.setStatus(order.getId(), PurchaseOrderStatus.BACKFLOWED.getValue());
}
直接暴露数据库表,容易产生数据安全问题;
多个系统操作同一数据库表,容易造成数据库表数据混乱;
操作同一个数据库表的代码,分布在不同的系统中,不便于管理和维护;
具有数据库表这样的强关联,无法实现系统间的隔离和解耦。
/** 采购单服务接口 */
publicinterface PurchaseOrderService {
/** 完成采购单函数 */
publicvoid finishPurchaseOrder(Long orderId);
}
/** 采购单服务实现 */
@Service("purchaseOrderService")
publicclass PurchaseOrderServiceImpl implements PurchaseOrderService {
/** 完成采购单函数 */
@Override
@Transactional(rollbackFor = Exception.class)
publicvoid finishPurchaseOrder(Long orderId) {
// 相关处理
...
// 完成采购单
purchaseOrderService.finishPurchaseOrder(order.getRawId());
}
}
/** 执行回流动作函数(此处省去获取采购单/验证状态/锁定采购单等逻辑) */
publicvoidexecuteBackflow(PurchaseOrder order) {
// 完成采购单
purchaseOrderService.finishPurchaseOrder(order.getRawId());
// 设置回流状态
purchaseOrderDAO.setStatus(order.getId(), PurchaseOrderStatus.BACKFLOWED.getValue());
}
其中,purchaseOrderService(采购单服务)为库管系统PurchaseOrderService(采购单服务)在采购系统中的Dubbo服务客户端存根,通过该服务调用库管系统的服务接口函数finishPurchaseOrder(完成采购单函数)。
这样,采购系统和库管系统自己的强关联,通过Dubbo就简单地实现了系统隔离和解耦。当然,除了采用Dubbo接口外,还可以采用HTTPS、HSF、WebService等同步接口调用方式,也可以采用MetaQ等异步消息通知方式。
常见系统间交互协议
同步接口调用
同步接口调用是以一种阻塞式的接口调用机制。常见的交互协议有:
HTTP/HTTPS接口;
WebService接口;
Dubbo/HSF接口;
CORBA接口。
异步消息通知
异步消息通知是一种通知式的信息交互机制。当系统发生某种事件时,会主动通知相应的系统。常见的交互协议有:
MetaQ的消息通知;
CORBA消息通知。
常见系统间交互方式
请求-应答
适用范围:
适合于简单的耗时较短的接口同步调用场景,比如 Dubbo 接口同步调用。
通知-确认
适用范围:
适合于简单的异步消息通知场景,比如 MetaQ 消息通知。
请求-应答-查询-返回
适用范围:
适合于复杂的耗时较长的接口同步调用场景,比如提交作业任务并定期查询任务结果。
请求-应答-回调
适用范围:
适合于复杂的耗时较长的接口同步调用和异步回调相结合的场景,比如支付宝的订单支付。
请求-应答-通知-确认
适用范围:
适合于复杂的耗时较长的接口同步调用和异步消息通知相结合的场景,比如提交作业任务并等待完成消息通知。
通知-确认-通知-确认
适用范围:
适合于复杂的耗时较长的异步消息通知场景。
数据查询不分页
在数据查询时,由于未能对未来数据量做出正确的预估,很多情况下都没有考虑数据的分页查询。
普通查询案例
以下是查询过期订单的代码:
/** 订单DAO接口 */
publicinterfaceOrderDAO {
/** 查询过期订单函数 */
@Select("select * from t_order where status = 5 and gmt_create < date_sub(current_timestamp, interval 30 day)")
public List<OrderDO> queryTimeout();
}
/** 订单服务接口 */
publicinterfaceOrderService {
/** 查询过期订单函数 */
public List<OrderVO> queryTimeout();
}
数据量太大,导致服务端的内存溢出;
数据量太大,导致查询接口超时、返回数据超时等;
数据量太大,导致客户端的内存溢出。
/** 订单DAO接口 */
publicinterfaceOrderDAO {
/** 查询过期订单函数 */
@Select("select * from t_order where status = 5 and gmt_create < date_sub(current_timestamp, interval 30 day) limit 0, #{maxCount}")
public List<OrderDO> queryTimeout(@Param("maxCount") Integer maxCount);
}
/** 订单服务接口 */
publicinterfaceOrderService {
/** 查询过期订单函数 */
public List<OrderVO> queryTimeout(Integer maxCount);
}
/** 订单DAO接口 */
publicinterfaceOrderDAO{
/** 统计过期订单函数 */
@Select("select count(*) from t_order where status = 5 and gmt_create < date_sub(current_timestamp, interval 30 day)")
publicLong countTimeout();
/** 查询过期订单函数 */
@Select("select * from t_order where status = 5 and gmt_create < date_sub(current_timestamp, interval 30 day) limit #{startIndex}, #{pageSize}")
public List<OrderDO> queryTimeout(@Param("startIndex")Long startIndex, @Param("pageSize") Integer pageSize);
}
/** 订单服务接口 */
publicinterfaceOrderService{
/** 查询过期订单函数 */
public PageData<OrderVO> queryTimeout(Long startIndex, Integer pageSize);
}
/** 订单DAO接口 */
publicinterfaceOrderDAO{
/** 查询过期订单函数 */
@Select("select * from t_order where status = 5 and gmt_create < date_sub(current_timestamp, interval 30 day) limit #{startIndex}, #{pageSize}")
public List<OrderDO> queryTimeout(@Param("startIndex")Long startIndex, @Param("pageSize") Integer pageSize);
/** 设置订单超时关闭 */
@Update("update t_order set status = 10 where id = #{orderId} and status = 5")
publicLong setTimeoutClosed(@Param("orderId")Long orderId)
}
/** 关闭过期订单作业类 */
publicclassCloseTimeoutOrderJobextendsJob{
/** 分页数量 */
private static final int PAGE_COUNT = 100;
/** 分页大小 */
private static final int PAGE_SIZE = 1000;
/** 作业执行函数 */
@Override
public void execute() {
for (int i = 0; i < PAGE_COUNT; i++) {
// 查询处理订单
List<OrderDO> orderList = orderDAO.queryTimeout(i * PAGE_COUNT, PAGE_SIZE);
if (OrderDO order : orderList) {
// 进行超时关闭
......
orderDAO.setTimeoutClosed(order.getId());
}
// 检查处理完毕
if(orderList.size() < PAGE_SIZE) {
break;
}
}
}
}
/** 订单DAO接口 */
publicinterface OrderDAO {
/** 查询过期订单函数 */
@Select("select * from t_order where status = 5 and gmt_create < date_sub(current_timestamp, interval 30 day) limit 0, #{maxCount}")
public List<OrderDO> queryTimeout(@Param("maxCount") Integer maxCount);
/** 设置订单超时关闭 */
@Update("update t_order set status = 10 where id = #{orderId} and status = 5")
public Long setTimeoutClosed(@Param("orderId") Long orderId)
}
/** 关闭过期订单作业(定时作业) */
publicclass CloseTimeoutOrderJob extends Job {
/** 分页数量 */
privatestatic final int PAGE_COUNT = 100;
/** 分页大小 */
privatestatic final int PAGE_SIZE = 1000;
/** 作业执行函数 */
@Override
publicvoid execute() {
for (int i = 0; i < PAGE_COUNT; i++) {
// 查询处理订单
List<OrderDO> orderList = orderDAO.queryTimeout(PAGE_SIZE);
if (OrderDO order : orderList) {
// 进行超时关闭
......
orderDAO.setTimeoutClosed(order.getId());
}
// 检查处理完毕
if(orderList.size() < PAGE_SIZE) {
break;
}
}
}
}
本文作者:
陈昌毅,花名常意,高德地图技术专家,2018 年加入阿里巴巴,一直从事地图数据采集的相关工作。
本文缩略图:icon by 耳铃
Tips:
# 点下“在看”❤️
# 然后,公众号对话框内发送“打起精神”,试试手气??
# 本期奖品是来自淘宝心选的蒸汽眼罩。